AWS Step Functionsにサーキットブレーカーパターンを組み込んでみた
AWS Step Functionsステートマシン実行中、再試行で回避できない予期しない障害が発生したときは、再試行せず、失敗系フローに遷移する必要があります。
ステートマシンをまたいで障害を管理できるよう、Amazon DynamoDBで障害を管理し、ステートマシンの安定性と回復性を向上する方法を紹介します。
サーキットブレーカーパターンをStep Functionsのステートマシンに組み込んだソリューションとも言えます。
基本的な考え方は次のAWSブログを参考にし、C#の実装をPythonに変更し、DynamoDBのTTLも使わずに実装をシンプルにしています。
Using the circuit breaker pattern with AWS Step Functions and Amazon DynamoDB | AWS Compute Blog
想定するユースケース
- ステートマシンに依存関係にあり、依存するステートマシンの実行が失敗したら、後続のステートマシンを処理したくない
- 運用のために、ステートマシンが実行されないよう、Step Functions外からコントロールしたい
といったユースケースを想定しています。
AWS Step Functionsへの組み込み
障害はDynamoDBで管理し、処理名をキーに障害発生時刻を登録し、現在時刻と比較して発生時刻が閾値内の場合、障害と判定します。
依存されるステートマシンでは、処理が失敗した時に、DynamoDBに障害情報を登録する(サーキットを中断・オープン状態にする)タスクを用意し、依存するステートマシンでは、処理の前に、障害が起きていないこと(サーキットがクローズド状態であること)を確認するタスクを用意します。
※ 図は 公式ブログから引用
Step Functionsのステートマシンは下図の通りです。
{ "StartAt": "Get Circuit Status", "States": { "Get Circuit Status": { "Next": "Is Circuit Closed?", "Type": "Task", "Comment": "Get Circuit Status", "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:GetCircuitStatus:$LATEST" }, "Is Circuit Closed?": { "Type": "Choice", "Choices": [ { "Variable": "$.CircuitStatus", "StringEquals": "OPEN", "Next": "Circuit Open" }, { "Variable": "$.CircuitStatus", "StringEquals": "CLOSED", "Next": "Execute Lambda" }, { "Variable": "$.CircuitStatus", "StringEquals": "", "Next": "Execute Lambda" } ] }, "Circuit Open": { "Type": "Fail" }, "Update Circuit Status": { "Next": "Circuit Open", "Type": "Task", "Comment": "Update Circuit Status", "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:UpdateCircuitStatus:$LATEST" }, "Execute Lambda": { "Next": "Circuit Closed", "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:JobExe:$LATEST", "Comment": "Task to execute lambda. This will set circuit status to OPEN if the execution fails for three times or the task times out", "TimeoutSeconds": 12, "Retry": [ { "BackoffRate": 1.5, "MaxAttempts": 3, "IntervalSeconds": 2, "ErrorEquals": [ "States.TaskFailed", "States.Timeout" ] } ], "Catch": [ { "ErrorEquals": [ "States.TaskFailed", "States.Timeout" ], "Next": "Update Circuit Status", "ResultPath": "$.taskresult" } ] }, "Circuit Closed": { "Type": "Succeed" } } }
以下の3つの処理があります
- 障害を確認するタスク(Get Circuit Status)
- 障害情報を登録するタスク(Update Circuit Status)
- メイン処理(Execute Lambda)
サンプルコードでは全てLambda関数で実装しています。
メイン処理は要件に合わせて変更してください。
サーキットブレーカーに関わる処理について軽く触れます。
障害情報を登録するタスク(Update Circuit Status)
メイン処理中に予期しない障害が起きた場合、DynamoDBにサービス(処理)をキーにタイムスタンプを登録します。
import boto3 import time dynamodb = boto3.resource('dynamodb') TABLE_NAME = 'CircuitStatus' KEY = 'Test' table = dynamodb.Table(TABLE_NAME) def lambda_handler(event, context): table.put_item( Item={ 'ServiceName' : KEY, 'Timestamp' : int(time.time()) } )
次の様なアイテムが登録されます。
ServiceName | Timestamp |
---|---|
Test | 1669749625 |
Step Functions外からサーキットを強制的にOPENにしたい場合、このLambda関数(Update Circuit Status
)を直接呼び出します。
障害を確認するタスク(Get Circuit Status)
DynamoDBのテーブルを確認し、障害が起きていないか確認します。
メイン処理を実行する前に、障害が起きていないかDynamoDBに確認します。
このサンプルコードでは、キーが存在し、かつタイムスタンプが障害発生から15分以内(TTL_THRESHOLD = 15 * 60 # in seconds
)であれば障害発生中と判定しています。
サーキットの状態に応じて
CLOSED
: 障害が起きていないOPEN
: 障害が起きている
のどちらかを返します。
import boto3 import time dynamodb = boto3.resource('dynamodb') TABLE_NAME = 'CircuitStatus' KEY = 'Test' table = dynamodb.Table(TABLE_NAME) def lambda_handler(event, context): TTL_THRESHOLD = 15 * 60 # in seconds TTL_THRESHOLD += time.time() res = table.get_item( Key={ 'ServiceName' : KEY } ) item = res.get('Item') if item and item['Timestamp'] < TTL_THRESHOLD: status = 'OPEN' else: status = 'CLOSED' return {'CircuitStatus' : status}
動作確認
以降では、サーキットの状態に応じたフローを確認します。
正常系(サーキットがCLOSED状態)
まずは、障害が起きておらず、メイン処理も正常に実行できた場合です。
すべてのタスクは緑色です。
この段階では、DynamoDBテーブル内は空です。
ServiceName | Timestamp | 現在時刻 |
---|---|---|
- | - | 1669749525 |
障害発生(サーキットをCLOSEDからOPENに変更)
メイン処理で障害が発生した場合、Update Circuit Status
タスクにより、DynamoDBに障害アイテムを登録し、サーキットをオープン状態にします。
ServiceName | Timestamp | 現在時刻 |
---|---|---|
Test | 1669749625 | 1669749625 |
※ Timestamp = 現在時刻
障害発生中(サーキットがOPEN状態)
障害発生中にステートマシンが呼ばれると、Get Circuit Status
タスクはCLOSED
を返し、メイン処理は呼ばれません。
ServiceName | Timestamp | 現在時刻 |
---|---|---|
Test | 1669749625 | 1669749925 |
※ Timestamp < 現在時刻 < Timestamp + TTL
障害から復帰(サーキットがOPENからCLOSEDに推移)
TTLが過ぎたあとで再度ステートマシンが呼ばれると、Get Circuit Status
タスクはOPEN
を返し、メイン処理が呼ばれます。
ServiceName | Timestamp | 現在時刻 |
---|---|---|
Test | 1669749625 | 1669750825 |
- TTL : 15分
- 現在時刻 - Timestamp = 1200 = 20分
※ Timestamp + TTL < 現在時刻
再び障害発生(サーキットをCLOSEDからOPENに変更)
再び、メイン処理で障害が発生した場合、Update Circuit Status
タスクにより、DynamoDBの既存アイテムのタイムスタンプを更新し、サーキットをオープン状態にします。
ServiceName | Timestamp | 現在時刻 |
---|---|---|
Test | 1669751125 | 1669751125 |
※ Timestamp = 現在時刻
最後に
Step FunctionsにDynamoDBとLambda関数でサーキットブレーカーを組み込む方法を紹介しました。
タスクやステートマシンで依存関係があるような処理で、例外処理を実行したいときに利用できます。
本記事の元になった記事では、同様のことがC#とCDKで実装されています。 サンプルコードがGitHubで公開されているため、一度覗いてみて下さい。
GitHub - aws-samples/circuit-breaker-netcore-blog